Inventory Events Channel (v1.0.0)
Central event stream for all inventory-related events including stock updates, allocations, and adjustments
Overview
The Inventory Events channel is the central stream for all inventory-related events across the system. This includes stock level changes, inventory allocations, adjustments, and stocktake events. Events for a specific SKU are guaranteed to be processed in sequence when using productId as the partition key.
Channel information
Address:
inventory.{env}.events
Protocol:
- kafka
Parameter | Description | Options | Default |
---|---|---|---|
env | Environment to use | dev, sit, prod | N/A |
Publishing and Subscribing to Events
Publishing Example
from kafka import KafkaProducerimport jsonfrom datetime import datetime
# Kafka configurationbootstrap_servers = ['localhost:9092']topic = f'inventory.{env}.events'
# Create a Kafka producerproducer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# Example inventory update eventinventory_event = { "eventType": "STOCK_LEVEL_CHANGED", "timestamp": datetime.utcnow().isoformat(), "version": "1.0", "payload": { "productId": "PROD-456", "locationId": "WH-123", "previousQuantity": 100, "newQuantity": 95, "changeReason": "ORDER_FULFILLED", "unitOfMeasure": "EACH", "batchInfo": { "batchId": "BATCH-789", "expiryDate": "2025-12-31" } }, "metadata": { "source": "warehouse_system", "correlationId": "inv-xyz-123", "userId": "john.doe" }}
# Send the message - using productId as key for partitioningproducer.send( topic, key=inventory_event['payload']['productId'].encode('utf-8'), value=inventory_event)producer.flush()
print(f"Inventory event sent to topic {topic}")
Subscription example
from kafka import KafkaConsumerimport jsonfrom datetime import datetime
class InventoryEventConsumer: def __init__(self): # Kafka configuration self.topic = f'inventory.{env}.events' self.consumer = KafkaConsumer( self.topic, bootstrap_servers=['localhost:9092'], group_id='inventory-processor-group', auto_offset_reset='earliest', enable_auto_commit=False, value_deserializer=lambda x: json.loads(x.decode('utf-8')), key_deserializer=lambda x: x.decode('utf-8') if x else None )
def process_event(self, event): """Process individual inventory events based on type""" event_type = event.get('eventType')
if event_type == 'STOCK_LEVEL_CHANGED': self.handle_stock_level_change(event) elif event_type == 'LOW_STOCK_ALERT': self.handle_low_stock_alert(event) # Add more event type handlers as needed
def handle_stock_level_change(self, event): """Handle stock level change events""" payload = event['payload'] print(f"Stock level change detected for product {payload['productId']}") print(f"New quantity: {payload['newQuantity']}") # Add your business logic here
def handle_low_stock_alert(self, event): """Handle low stock alert events""" payload = event['payload'] print(f"Low stock alert for product {payload['productId']}") print(f"Current quantity: {payload['currentQuantity']}") # Add your business logic here
def start_consuming(self): """Start consuming messages from the topic""" try: print(f"Starting consumption from topic: {self.topic}") for message in self.consumer: try: # Process the message event = message.value print(f"Received event: {event['eventType']} for product: {event['payload']['productId']}")
# Process the event self.process_event(event)
# Commit the offset after successful processing self.consumer.commit()
except Exception as e: print(f"Error processing message: {str(e)}") # Implement your error handling logic here # You might want to send to a DLQ (Dead Letter Queue)
except Exception as e: print(f"Consumer error: {str(e)}") finally: # Clean up self.consumer.close()
if __name__ == "__main__": # Create and start the consumer consumer = InventoryEventConsumer() consumer.start_consuming()